package com.rtsapp.server.network.protocol.rpc.client.impl;

import com.rtsapp.server.network.protocol.ProtocolConstants;
import com.rtsapp.server.network.protocol.rpc.client.IPushHandler;
import com.rtsapp.server.network.protocol.rpc.codec.*;
import com.rtsapp.server.network.session.Session;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandler.Sharable;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

import io.netty.channel.ChannelInboundHandlerAdapter;
import com.rtsapp.server.logger.Logger;


/**
 * 一个RPC连接的Handler
 * 1. 支持发送请求
 * 2. 接收服务器发过来的响应和推送并处理
 */
@Sharable
public class RPCClientInHandler extends ChannelInboundHandlerAdapter {

	private static final Logger logger =com.rtsapp.server.logger.LoggerFactory.getLogger( RPCClientInHandler.class );
	
	// 所有等待处理的RPCFuture
	private final ConcurrentMap<Long, RPCFuture> rpcFutures =  new ConcurrentHashMap<Long, RPCFuture >();
	// 原子ID
	private final AtomicLong sequenceNoGenerator = new AtomicLong( 0 );
	
	private Channel channel; //通道
	private final RPCClientProxy rpcProxy;

	public RPCClientInHandler( RPCClientProxy proxy  ){
		this.rpcProxy = proxy;
	}

	@Override
	public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
		super.channelRegistered(ctx);
		channel = ctx.channel();
		
	}

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		super.channelActive(ctx);

	}

	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		super.channelInactive(ctx);

		reconnect();
	}


	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

		super.exceptionCaught(ctx, cause);
        logger.error("RPCClientInHandler exception from downstream.", cause);

		//这里还是先关闭, 业务逻辑的异常应该在自己的Handler中捕获,这里发生的异常认为是不可恢复的,因此会关闭连接进行重连
        ctx.close();
		reconnect();
	}
	
	

	/**
	 * 重连
	 */
	private void reconnect( ){
		rpcProxy.removeHandlerAndReconnect(this);
	}

	
	
	public void close(){
	    channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }



	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

		if( ! (msg instanceof  ByteBuf ) ){
			logger.error("RPCClientInHandler channelRead 错误, msg 不是ByteBuf" );
			return;
		}


		ByteBuf buf = (ByteBuf) msg;

		try{
			// 这一样读取不能删除
			int msgSize = buf.readInt();

			Object message = KryoSerializer.read(buf);

			if( message instanceof RPCResponse){
				//响应
				RPCResponse response = (RPCResponse)message;

				RPCFuture rpcFuture = rpcFutures.get( response.getSequenceNo() );
				if( rpcFuture != null ){
					rpcFutures.remove( response.getSequenceNo() );
					rpcFuture.done( response );
				}
			}else if( message instanceof RPCPush){
				//推送
				processPush( ctx.channel(), (RPCPush)message);
			}else{
				logger.error( "不是一个合法的RPC包" );
			}


		} catch (Throwable ex) {
			logger.error("RPCClientInHandler 消息处理异常", ex);
		}finally {
			buf.release();
		}

	}

	
	
	public RPCFuture sendRPC( RPCRequest rpcRequest ){

		long sequenceNo = sequenceNoGenerator.incrementAndGet();
		rpcRequest.setSequenceNo(sequenceNo);
		RPCContext ctx = new RPCContext( );
		ctx.setRequest(rpcRequest);

		RPCFuture future = new RPCFuture( ctx, this );

		//rpc回调中
		this.rpcFutures.put( sequenceNo , future );

		channel.writeAndFlush( rpcRequest );

		return future;
	}


	//向RPC服务器注册客户端ID
	void sendRegisterClientId(){

		String clientId =  rpcProxy.getClientId();
		Channel ch = this.channel;

		if( clientId != null && ch != null ) {
			RPCRegister register = new RPCRegister();
			register.setClientId( clientId );
			ch.writeAndFlush(register);
		}

	}


	private void processPush(Channel ch,  RPCPush push ){

		IPushHandler pushHandler = rpcProxy.getPushHandler();

		if( pushHandler != null ){
			try {
				Object result = pushHandler.processPush( push.getMessage() );

				RPCPushResponse response = new RPCPushResponse();
				response.setSequenceNo( push.getSequenceNo() );
				response.setResult( result );
				response.setErrorNo( ProtocolConstants.RPCConstants.RPCStatus.ok );

				ch.writeAndFlush( response );


			}catch( Throwable ex ){
				logger.error( "processPush error : ", ex );

				RPCPushResponse response = new RPCPushResponse();
				response.setSequenceNo( push.getSequenceNo() );
				response.setResult( ex );
				response.setErrorNo( ProtocolConstants.RPCConstants.RPCStatus.exception );

				ch.writeAndFlush( response );
			}
		}else{
			logger.error( "收到推送消息, 但是没有对应的处理接口" );
		}
	}
	
}
